d05d5d797b507fe2176df9199cd884dc5203df05,src/main/java/com/github/ddth/kafka/internal/KafkaHelper.java,KafkaHelper,seekToEnd,#KafkaConsumer#String#,64

Before Change


                List<PartitionInfo> partInfo = consumer.partitionsFor(topic);
                for (PartitionInfo p : partInfo) {
                    TopicPartition tp = new TopicPartition(topic, p.partition());
                    consumer.assign(Arrays.asList(tp));
                    consumer.seekToEnd(tp);
                    consumer.position(tp);
                    consumer.commitSync();
                }
            } finally {
                consumer.unsubscribe();

After Change


     * @param consumer
     * @param topic
     */
    public static void seekToEnd(final KafkaConsumer<?, ?> consumer, final String topic) {
        synchronized (consumer) {
            // first, save the current subscription
            Set<String> subscription = consumer.subscription();
            try {
                // second, unsubscribe and re-subscribe to all partitions.
                consumer.unsubscribe();
                List<PartitionInfo> partInfo = consumer.partitionsFor(topic);
                Collection<TopicPartition> tpList = new ArrayList<>();
                for (PartitionInfo p : partInfo) {
                    TopicPartition tp = new TopicPartition(topic, p.partition());
                    tpList.add(tp);
                }
                consumer.assign(tpList);
                consumer.seekToEnd(tpList);
                // we want to seek as soon as possible
                for (TopicPartition tp : tpList) {
                    // since seekToEnd evaluates lazily, invoke position() so
                    // that seeking will be committed.
                    consumer.position(tp);
                }
                consumer.commitSync();
            } finally {
                // finally, restore subscription
                consumer.unsubscribe();